package com.heima.behavior.listener;

import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApFollowBehaviorService;
import com.heima.common.constants.message.FollowBehaviorConstants;
import com.heima.model.behavior.dtos.FollowBehaviorDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * kafka消费者监听器（实现关注行为记录）
 */
@Component
@Slf4j
public class ApFollowBehaviorListener {

    @Autowired
    private ApFollowBehaviorService apFollowBehaviorService;


    @KafkaListener(topics = FollowBehaviorConstants.FOLLOW_BEHAVIOR_TOPIC)
    public void msgReceive(ConsumerRecord<String,String> consumerRecord){
        Optional<ConsumerRecord<String, String>> optional = Optional.ofNullable(consumerRecord);
        optional.ifPresent(x->{
            log.info("开始执行关注行为记录........");
            FollowBehaviorDto followBehaviorDto = JSON.parseObject(x.value(), FollowBehaviorDto.class);
            apFollowBehaviorService.follow(followBehaviorDto);
            log.info("完成关注行为记录........");
        });
    }
}
